-
Notifications
You must be signed in to change notification settings - Fork 25.7k
[ML] Ensure queued AbstractRunnables are notified when executor stops #135966
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
AbstractProcessWorkerExecutorService.notifyQueueRunnables() was making an incorrect assumption that all AbstractRunnables that were submitted for execution would be queued as AbstractRunnables. However, PriorityProcessWorkerExecutorService wraps AbstractRunnables in OrderedRunnable before queueing them, and since OrderedRunnable is not an AbstractRunnable, these were skipped when notifyQueueRunnables() drained the queue, leading to potential hangs. - Refactor notifyQueueRunnables() to allow PriorityProcessWorkerExecutorService to notify the AbstractRunnable contained within queued OrderedRunnables - Ensure that notifyQueueRunnables() is called and the executor marked as shut down if an exception is thrown from start() - Add unit tests Closes elastic#134651
|
Pinging @elastic/ml-core (Team:ML) |
|
Hi @DonalEvans, I've created a changelog YAML for you. |
jonathan-buttner
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work, left one suggestion
| } | ||
| } | ||
|
|
||
| protected abstract void notifyIfAbstractRunnable(T runnable, Exception ex, String msg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about having AbstractProcessWorkerExecutorService contain the logic for notifyIfAbstractRunnable and relying on an abstract method like getAsAbstractRunnable which either returns the AbstractRunnable or null. That way child classes don't need to call back into the parent to do the notification, they would only return the abstract runnable if it was one.
Something like:
protected abstract AbstractRunnable getAsAbstractRunnable(T runnable);
private void notifyIfAbstractRunnable(T runnable, Exception ex, String msg) {
var abstractRunnable = getAsAbstractRunnable(runnable);
if (abstractRunnable != null) {
notifyAbstractRunnable(ex, msg, abstractRunnable);
}
}
Then PriorityProcessWorkerExecutorService would have something like:
@Override
protected AbstractRunnable getAsAbstractRunnable(OrderedRunnable orderedRunnable, Exception ex, String msg) {
// The runnable contained within OrderedRunnable is always an AbstractRunnable, so no need to check the type
return orderedRunnable.runnable();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, this is better
| * most cases will be the insertion order | ||
| */ | ||
| public record OrderedRunnable(RequestPriority priority, long tieBreaker, Runnable runnable) | ||
| protected record OrderedRunnable(RequestPriority priority, long tieBreaker, AbstractRunnable runnable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option is make OrderedRunnable extend AbstractRunnable and implement the required methods as calling the runnable parameter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This solution adds more code than the original one, but I think it might be better, since it simplifies the logic for determining what we need to notify.
…elastic#135966) AbstractProcessWorkerExecutorService.notifyQueueRunnables() was making an incorrect assumption that all AbstractRunnables that were submitted for execution would be queued as AbstractRunnables. However, PriorityProcessWorkerExecutorService wraps AbstractRunnables in OrderedRunnable before queueing them, and since OrderedRunnable is not an AbstractRunnable, these were skipped when notifyQueueRunnables() drained the queue, leading to potential hangs. - Make OrderedRunnable extend AbstractRunnable and pass onFailure() and onRejection() calls to the AbstractRunnable it wraps - Ensure that notifyQueueRunnables() is called and the executor marked as shut down if an exception is thrown from start() - Add unit tests - Update docs/changelog/135966.yaml Closes elastic#134651
💔 Backport failed
You can use sqren/backport to manually backport by running |
…elastic#135966) AbstractProcessWorkerExecutorService.notifyQueueRunnables() was making an incorrect assumption that all AbstractRunnables that were submitted for execution would be queued as AbstractRunnables. However, PriorityProcessWorkerExecutorService wraps AbstractRunnables in OrderedRunnable before queueing them, and since OrderedRunnable is not an AbstractRunnable, these were skipped when notifyQueueRunnables() drained the queue, leading to potential hangs. - Make OrderedRunnable extend AbstractRunnable and pass onFailure() and onRejection() calls to the AbstractRunnable it wraps - Ensure that notifyQueueRunnables() is called and the executor marked as shut down if an exception is thrown from start() - Add unit tests - Update docs/changelog/135966.yaml Closes elastic#134651 (cherry picked from commit d3d013e)
💚 All backports created successfully
Questions ?Please refer to the Backport tool documentation |
…elastic#135966) AbstractProcessWorkerExecutorService.notifyQueueRunnables() was making an incorrect assumption that all AbstractRunnables that were submitted for execution would be queued as AbstractRunnables. However, PriorityProcessWorkerExecutorService wraps AbstractRunnables in OrderedRunnable before queueing them, and since OrderedRunnable is not an AbstractRunnable, these were skipped when notifyQueueRunnables() drained the queue, leading to potential hangs. - Make OrderedRunnable extend AbstractRunnable and pass onFailure() and onRejection() calls to the AbstractRunnable it wraps - Ensure that notifyQueueRunnables() is called and the executor marked as shut down if an exception is thrown from start() - Add unit tests - Update docs/changelog/135966.yaml Closes elastic#134651 (cherry picked from commit d3d013e)
…elastic#135966) AbstractProcessWorkerExecutorService.notifyQueueRunnables() was making an incorrect assumption that all AbstractRunnables that were submitted for execution would be queued as AbstractRunnables. However, PriorityProcessWorkerExecutorService wraps AbstractRunnables in OrderedRunnable before queueing them, and since OrderedRunnable is not an AbstractRunnable, these were skipped when notifyQueueRunnables() drained the queue, leading to potential hangs. - Make OrderedRunnable extend AbstractRunnable and pass onFailure() and onRejection() calls to the AbstractRunnable it wraps - Ensure that notifyQueueRunnables() is called and the executor marked as shut down if an exception is thrown from start() - Add unit tests - Update docs/changelog/135966.yaml Closes elastic#134651 (cherry picked from commit d3d013e)
…#135966) (#136122) AbstractProcessWorkerExecutorService.notifyQueueRunnables() was making an incorrect assumption that all AbstractRunnables that were submitted for execution would be queued as AbstractRunnables. However, PriorityProcessWorkerExecutorService wraps AbstractRunnables in OrderedRunnable before queueing them, and since OrderedRunnable is not an AbstractRunnable, these were skipped when notifyQueueRunnables() drained the queue, leading to potential hangs. - Make OrderedRunnable extend AbstractRunnable and pass onFailure() and onRejection() calls to the AbstractRunnable it wraps - Ensure that notifyQueueRunnables() is called and the executor marked as shut down if an exception is thrown from start() - Add unit tests - Update docs/changelog/135966.yaml Closes #134651
…#135966) (#136126) AbstractProcessWorkerExecutorService.notifyQueueRunnables() was making an incorrect assumption that all AbstractRunnables that were submitted for execution would be queued as AbstractRunnables. However, PriorityProcessWorkerExecutorService wraps AbstractRunnables in OrderedRunnable before queueing them, and since OrderedRunnable is not an AbstractRunnable, these were skipped when notifyQueueRunnables() drained the queue, leading to potential hangs. - Make OrderedRunnable extend AbstractRunnable and pass onFailure() and onRejection() calls to the AbstractRunnable it wraps - Ensure that notifyQueueRunnables() is called and the executor marked as shut down if an exception is thrown from start() - Add unit tests - Update docs/changelog/135966.yaml Closes #134651 (cherry picked from commit d3d013e)
…#135966) (#136128) AbstractProcessWorkerExecutorService.notifyQueueRunnables() was making an incorrect assumption that all AbstractRunnables that were submitted for execution would be queued as AbstractRunnables. However, PriorityProcessWorkerExecutorService wraps AbstractRunnables in OrderedRunnable before queueing them, and since OrderedRunnable is not an AbstractRunnable, these were skipped when notifyQueueRunnables() drained the queue, leading to potential hangs. - Make OrderedRunnable extend AbstractRunnable and pass onFailure() and onRejection() calls to the AbstractRunnable it wraps - Ensure that notifyQueueRunnables() is called and the executor marked as shut down if an exception is thrown from start() - Add unit tests - Update docs/changelog/135966.yaml Closes #134651 (cherry picked from commit d3d013e)
…#135966) (#136125) AbstractProcessWorkerExecutorService.notifyQueueRunnables() was making an incorrect assumption that all AbstractRunnables that were submitted for execution would be queued as AbstractRunnables. However, PriorityProcessWorkerExecutorService wraps AbstractRunnables in OrderedRunnable before queueing them, and since OrderedRunnable is not an AbstractRunnable, these were skipped when notifyQueueRunnables() drained the queue, leading to potential hangs. - Make OrderedRunnable extend AbstractRunnable and pass onFailure() and onRejection() calls to the AbstractRunnable it wraps - Ensure that notifyQueueRunnables() is called and the executor marked as shut down if an exception is thrown from start() - Add unit tests - Update docs/changelog/135966.yaml Closes #134651 (cherry picked from commit d3d013e)
…#135966) (#136127) AbstractProcessWorkerExecutorService.notifyQueueRunnables() was making an incorrect assumption that all AbstractRunnables that were submitted for execution would be queued as AbstractRunnables. However, PriorityProcessWorkerExecutorService wraps AbstractRunnables in OrderedRunnable before queueing them, and since OrderedRunnable is not an AbstractRunnable, these were skipped when notifyQueueRunnables() drained the queue, leading to potential hangs. - Make OrderedRunnable extend AbstractRunnable and pass onFailure() and onRejection() calls to the AbstractRunnable it wraps - Ensure that notifyQueueRunnables() is called and the executor marked as shut down if an exception is thrown from start() - Add unit tests - Update docs/changelog/135966.yaml Closes #134651 (cherry picked from commit d3d013e)
AbstractProcessWorkerExecutorService.notifyQueueRunnables() was making an incorrect assumption that all AbstractRunnables that were submitted for execution would be queued as AbstractRunnables. However, PriorityProcessWorkerExecutorService wraps AbstractRunnables in OrderedRunnable before queueing them, and since OrderedRunnable is not an AbstractRunnable, these were skipped when notifyQueueRunnables() drained the queue, leading to potential hangs.
Closes #134651